跳到主要内容

Java 多线程-阻塞队列相关工具类

阻塞队列的由来

参考资料 第十三章 阻塞队列

假设一种场景,生产者一直生产资源,消费者一直消费资源,资源存储在一个缓冲池中,生产者将生产的资源存进缓冲池中,消费者从缓冲池中拿到资源进行消费,这就是大名鼎鼎的 生产者-消费者模式

该模式能够简化开发过程,一方面消除了生产者类与消费者类之间的代码依赖性,另一方面将生产数据的过程与使用数据的过程解耦简化负载。

自己实现这个模式的时候,因为需要让多个线程操作共享变量(即资源),所以很容易引发线程安全问题,造成 重复消费和死锁,尤其是生产者和消费者存在多个的情况。另外,当缓冲池空了,我们需要阻塞消费者,唤醒生产者;当缓冲池满了,我们需要阻塞生产者,唤醒消费者,这些个等待-唤醒逻辑都需要自己实现。

这么容易出错的事情,JDK当然帮我们做啦,这就是阻塞队列(BlockingQueue),你只管往里面存、取就行,而不用担心多线程环境下存、取共享变量的线程安全问题。

BlockingQueue 是 Java util.concurrent 包下重要的数据结构,区别于普通的队列,BlockingQueue 提供了线程安全的队列访问方式,并发包下很多高级同步类的实现都是基于 BlockingQueue 实现的。

BlockingQueue 的操作方法

阻塞队列提供了四组不同的方法用于插入、移除、检查元素:

方法\处理方式抛出异常返回特殊值一直阻塞超时退出
插入方法add(e)offer(e)put(e)offer(e,time,unit)
移除方法remove()poll()take()poll(time,unit)
检查方法element()peek()--

抛出异常:如果试图的操作无法立即执行,抛异常。当阻塞队列满时候,再往队列里插入元素,会抛出 IllegalStateException(“Queue full”) 异常。当队列为空时,从队列里获取元素时会抛出 NoSuchElementException 异常 。

返回特殊值:如果试图的操作无法立即执行,返回一个特殊值,通常是true / false。

一直阻塞:如果试图的操作无法立即执行,则一直阻塞或者响应中断。

超时退出:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功,通常是 true / false。

注意之处

  • 不能往阻塞队列中插 入null,会抛出空指针异常。
  • 可以访问阻塞队列中的任意元素,调用 remove(o) 可以将队列之中的特定对象移除,但并不高效,尽量避免使用。

BlockingQueue 的实现类

ArrayBlockingQueue

由数组结构组成的有界阻塞队列。内部结构是数组,故具有数组的特性。

public ArrayBlockingQueue(int capacity, boolean fair){
//..省略代码
}

可以初始化队列大小, 且一旦初始化不能改变。构造方法中的 fair 表示控制对象的内部锁是否采用公平锁,默认是 非公平锁

LinkedBlockingQueue

由链表结构组成的有界阻塞队列。内部结构是链表,具有链表的特性。默认队列的大小是 Integer.MAX_VALUE,也可以指定大小。此队列按照先进先出的原则对元素进行排序。

DelayQueue

该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。注入其中的元素必须实现 java.util.concurrent.Delayed 接口。

DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

PriorityBlockingQueue

基于优先级的无界阻塞队列(优先级的判断通过构造函数传入的 comparator 对象来决定),内部控制线程同步的锁采用的是非公平锁。

public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
this.lock = new ReentrantLock(); //默认构造方法-非公平锁
...//其余代码略
}

PriorityBlockingQueue 不会阻塞数据生产者(因为队列是无界的),而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。对于使用默认大小的 LinkedBlockingQueue 也是一样的。

BlockingQueue 示例和使用场景

生产者-消费者模型

public class Temp {
public static void main(String[] args) {
int queueSize = 10;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(queueSize);

// 消费者
new Thread(() -> {
while (true) {
try {
queue.take();
System.out.println("从队列取走一个元素,队列剩余" + queue.size() + "个元素");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "consumer").start();

// 生产者
new Thread(() -> {
while (true) {
try {
queue.put(1);
System.out.println("向队列取中插入一个元素,队列剩余空间:" + (queueSize - queue.size()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "producer").start();
}
}

注意,这个例子中的输出结果看起来可能有问题,比如有几行在插入一个元素之后,队列的剩余空间不变。这是由于 System.out.println 语句没有锁。

考虑到这样的情况:线程1 在执行完 put/take 操作后立即失去 CPU 时间片,然后切换到线程2 执行 put/take 操作,执行完毕后回到线程1 的 System.out.println 语句并输出,发现这个时候阻塞队列的 size 已经被线程2改变了,所以这个时候输出的 size 并不是当时线程1 执行完 put/take 操作之后阻塞队列的 size,但可以确保的是size不会超过10个。

实际上使用阻塞队列是没有问题的。

线程池中使用阻塞队列

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

Java 中的线程池就是使用阻塞队列实现的